//
// Created by 29108 on 2025/7/5.
//

#include "common/thread_pool/thread_pool.h"
#include "common/logger/logger.h"

namespace common {
    namespace thread_pool {

        ThreadPool::ThreadPool(): ThreadPool(Config::fromConfigManager()) {
            // 避免循环依赖，使用标准输出
            std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Initialized with ConfigManager settings" << std::endl;
        }

        ThreadPool::ThreadPool(const Config &config) : config_(config), stop_(false), active_threads_(0) {
            config_.validate();

            // 使用配置初始化线程池
            size_t thread_count = static_cast<size_t>(config_.core_pool_size);

            // 避免循环依赖，使用标准输出
            std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Initializing with " << thread_count << " threads" << std::endl;

            // 创建工作线程
            workers_.reserve(thread_count);
            for (size_t i = 0; i < thread_count; ++i) {
                workers_.emplace_back([this] { workerThread(); });
            }

            // 如果启用监控，启动监控线程
            if (config_.enable_monitoring) {
                startMonitoring();
            }

            // 避免循环依赖，使用标准输出
            std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Initialized successfully" << std::endl;
        }
        ThreadPool::~ThreadPool() {
            try {
                if (!isShutdown()) {
                    // 避免循环依赖，使用标准输出
                    std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Destructor initiating shutdown" << std::endl;
                    shutdown();
                }
            } catch (const std::exception& e) {
                // 避免循环依赖，使用标准错误输出
                std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Exception during destruction: " << e.what() << std::endl;
                // 如果优雅关闭失败，强制关闭
                try {
                    forceShutdown();
                } catch (...) {
                    // 析构函数中不能抛出异常
                }
            } catch (...) {
                // 避免循环依赖，使用标准错误输出
                std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Unknown exception during destruction" << std::endl;
                try {
                    forceShutdown();
                } catch (...) {
                    // 析构函数中不能抛出异常
                }
            }
        }

        size_t ThreadPool::getQueueSize() const {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            return tasks_.size();
        }

        size_t ThreadPool::getActiveThreadCount() const {
            return active_threads_.load();
        }

        size_t ThreadPool::getTotalThreadCount() const {
            return workers_.size();
        }

        void ThreadPool::shutdown() {
            // 避免循环依赖，使用标准输出
            std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Shutdown initiated" << std::endl;

            // 1. 检查是否已经关闭，避免重复关闭
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                if (stop_) {
                    // 避免循环依赖，使用标准错误输出
                    std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Already shutdown" << std::endl;
                    return;
                }
                stop_ = true;
            }

            // 2. 停止监控线程（如果启用）
            try {
                if (monitoring_enabled_.load()) {
                    stopMonitoring();
                    // 避免循环依赖，使用标准输出
                    std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Monitoring stopped during shutdown" << std::endl;
                }
            } catch (const std::exception& e) {
                // 避免循环依赖，使用标准错误输出
                std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Error stopping monitoring: " << e.what() << std::endl;
            }

            // 3. 通知所有工作线程停止
            std::cout << "[ThreadPool] Notifying all worker threads to stop..." << std::endl;
            condition_.notify_all();

            // 4. 等待工作线程完成，带超时机制
            auto shutdown_start = std::chrono::steady_clock::now();
            const auto max_wait_time = std::chrono::seconds(30);  // 30秒超时
            std::vector<std::thread::id> failed_threads;

            std::cout << "[ThreadPool] Starting to join " << workers_.size() << " worker threads..." << std::endl;

            for (size_t i = 0; i < workers_.size(); ++i) {
                auto& worker = workers_[i];
                std::cout << "[ThreadPool] Processing worker thread " << (i + 1) << "/" << workers_.size() << std::endl;

                if (worker.joinable()) {
                    std::cout << "[ThreadPool] Worker thread " << (i + 1) << " is joinable, attempting to join..." << std::endl;
                    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
                        std::chrono::steady_clock::now() - shutdown_start);
                    auto remaining_time = max_wait_time - elapsed;

                    if (remaining_time <= std::chrono::milliseconds(0)) {
                        // 避免循环依赖，使用标准错误输出
                        std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Shutdown timeout reached, detaching remaining threads" << std::endl;
                        failed_threads.push_back(worker.get_id());
                        worker.detach();
                    } else {
                        // 简化的join实现，避免复杂的线程同步问题
                        try {
                            // 直接join，依赖系统的线程管理
                            std::cout << "[ThreadPool] About to join worker thread " << (i + 1) << "..." << std::endl;
                            worker.join();
                            // 避免循环依赖，使用标准输出
                            std::cout << "[ThreadPool] Worker thread " << (i + 1) << " joined successfully" << std::endl;
                        } catch (const std::exception& e) {
                            // 避免循环依赖，使用标准错误输出
                            std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Worker thread join failed: " << e.what() << std::endl;
                            failed_threads.push_back(worker.get_id());
                            try {
                                worker.detach();
                            } catch (...) {
                                // 忽略detach异常
                            }
                        } catch (...) {
                            // 避免循环依赖，使用标准错误输出
                            std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Worker thread join failed with unknown exception" << std::endl;
                            failed_threads.push_back(worker.get_id());
                            try {
                                worker.detach();
                            } catch (...) {
                                // 忽略detach异常
                            }
                        }
                    }
                }
            }

            // 5. 清理队列中剩余的任务
            size_t remaining_tasks = 0;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                remaining_tasks = tasks_.size();
                while (!tasks_.empty()) {
                    tasks_.pop();
                }
            }

            if (remaining_tasks > 0) {
                // 避免循环依赖，使用标准错误输出
                std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Discarded " << remaining_tasks <<
                            " pending tasks during shutdown" << std::endl;
            }

            // 6. 重置统计信息
            active_threads_ = 0;
            core_threads_to_reduce_ = 0;
            threads_to_terminate_ = 0;

            // 7. 记录关闭结果
            auto shutdown_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                std::chrono::steady_clock::now() - shutdown_start);

            if (failed_threads.empty()) {
                // 避免循环依赖，使用标准输出
                std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Shutdown completed successfully in " <<
                            shutdown_duration.count() << "ms" << std::endl;
            } else {
                // 避免循环依赖，使用标准错误输出
                std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Shutdown completed with " << failed_threads.size() <<
                            " threads detached in " << shutdown_duration.count() << "ms" << std::endl;
            }
        }

        void ThreadPool::forceShutdown() {
            // 避免循环依赖，使用标准错误输出
            std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Force shutdown initiated" << std::endl;

            // 1. 立即设置停止标志
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                stop_ = true;
            }

            // 2. 强制停止监控
            try {
                if (monitoring_enabled_.load()) {
                    stopMonitoring();
                }
            } catch (...) {
                // 忽略强制关闭时的异常
            }

            // 3. 通知所有线程
            condition_.notify_all();

            // 4. 立即分离所有线程（不等待）
            for (auto& worker : workers_) {
                if (worker.joinable()) {
                    worker.detach();
                }
            }

            // 5. 清理队列
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                size_t discarded = tasks_.size();
                while (!tasks_.empty()) {
                    tasks_.pop();
                }
                if (discarded > 0) {
                    // 避免在关闭时使用日志系统，使用标准错误输出
                    std::cerr << "[ThreadPool] Force shutdown discarded " << discarded << " pending tasks" << std::endl;
                }
            }

            // 6. 重置统计
            active_threads_ = 0;
            core_threads_to_reduce_ = 0;
            threads_to_terminate_ = 0;

            // 避免在关闭时使用日志系统，使用标准输出
            std::cout << "[ThreadPool] Force shutdown completed" << std::endl;
        }

        bool ThreadPool::isShutdown() const {
            return stop_.load();
        }

        void ThreadPool::enableConfigHotReload() {
            auto& config_manager = common::config::ConfigManager::getInstance();

            // 监听核心线程数变化
            config_manager.addChangeListener("thread_pool.core_pool_size",
                [this](const auto& /*key*/, const auto& old_val, const auto& new_val) {
                    LOG_INFO("ThreadPool core_pool_size changed from " + old_val + " to " + new_val);
                    try {
                        int new_size = std::stoi(new_val);
                        this->adjustCorePoolSize(new_size);
                    } catch (const std::exception& e) {
                        LOG_ERROR("Failed to adjust core pool size: " + std::string(e.what()));
                    }
                });

            // 监听最大线程数变化
            config_manager.addChangeListener("thread_pool.maximum_pool_size",
                [this](const auto& key, const auto& old_val, const auto& new_val) {
                    LOG_INFO("ThreadPool maximum_pool_size changed from " + old_val + " to " + new_val);
                    try {
                        int new_max_size = std::stoi(new_val);
                        this->adjustMaximumPoolSize(new_max_size);
                    } catch (const std::exception& e) {
                        LOG_ERROR("Failed to adjust maximum pool size: " + std::string(e.what()));
                    }
                });
        }

        void ThreadPool::workerThread() {
            while (true) {
                std::function<void()> task;

                {
                    std::unique_lock<std::mutex> lock(queue_mutex_);

                    // 检查是否需要终止此线程
                    if (threads_to_terminate_ > 0) {
                        threads_to_terminate_--;
                        // 避免在关闭时使用日志系统，使用标准输出
                        std::cout << "[ThreadPool] Worker thread terminating (scale down)" << std::endl;
                        return;
                    }

                    // 检查是否需要将核心线程转为非核心线程
                    if (core_threads_to_reduce_ > 0) {
                        core_threads_to_reduce_--;
                        // 这里可以实现核心线程转非核心线程的逻辑
                    }

                    // 等待任务或超时
                    condition_.wait_for(lock, std::chrono::milliseconds(config_.keep_alive_time_ms), [this] {
                        return stop_ || !tasks_.empty();
                    });

                    if (stop_ && tasks_.empty()) {
                        return;
                    }

                    if (!tasks_.empty()) {
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                }

                if (task) {
                    active_threads_++;
                    try {
                        task();
                    } catch (const std::exception& e) {
                        // 避免在关闭时使用日志系统，使用标准错误输出
                        std::cerr << "[ThreadPool] Task execution failed: " << e.what() << std::endl;
                    } catch (...) {
                        // 避免在关闭时使用日志系统，使用标准错误输出
                        std::cerr << "[ThreadPool] Task execution failed with unknown exception" << std::endl;
                    }
                    active_threads_--;
                }
            }
        }

        void ThreadPool::adjustCorePoolSize(int new_core_size) {
            if (new_core_size <= 0) {
                LOG_WARNING("Invalid core pool size: " + std::to_string(new_core_size));
                return;
            }

            if (new_core_size > config_.maximum_pool_size) {
                LOG_WARNING("Core pool size (" + std::to_string(new_core_size) +
                           ") cannot exceed maximum pool size (" + std::to_string(config_.maximum_pool_size) + ")");
                return;
            }

            std::lock_guard<std::mutex> lock(queue_mutex_);

            int old_core_size = config_.core_pool_size;
            if (new_core_size == old_core_size) {
                LOG_DEBUG("Core pool size unchanged: " + std::to_string(new_core_size));
                return;
            }

            try {
                config_.core_pool_size = new_core_size;

                if (new_core_size > old_core_size) {
                    // 扩容：创建新的核心线程
                    int threads_to_add = new_core_size - old_core_size;
                    for (int i = 0; i < threads_to_add; ++i) {
                        workers_.emplace_back([this] { workerThread(); });
                    }
                    LOG_INFO("Expanded core pool size from " + std::to_string(old_core_size) +
                            " to " + std::to_string(new_core_size));
                } else {
                    // 缩容：标记多余的核心线程为非核心线程
                    core_threads_to_reduce_ = old_core_size - new_core_size;
                    condition_.notify_all(); // 通知线程检查状态

                    LOG_INFO("Reduced core pool size from " + std::to_string(old_core_size) +
                            " to " + std::to_string(new_core_size));
                }

            } catch (const std::exception& e) {
                LOG_ERROR("Failed to adjust core pool size: " + std::string(e.what()));
                config_.core_pool_size = old_core_size; // 回滚
                throw;
            }
        }

        void ThreadPool::adjustMaximumPoolSize(int new_max_size) {
            if (new_max_size <= 0) {
                LOG_WARNING("Invalid maximum pool size: " + std::to_string(new_max_size));
                return;
            }

            if (new_max_size < config_.core_pool_size) {
                LOG_WARNING("Maximum pool size (" + std::to_string(new_max_size) +
                           ") cannot be less than core pool size (" + std::to_string(config_.core_pool_size) + ")");
                return;
            }

            std::lock_guard<std::mutex> lock(queue_mutex_);

            int old_max_size = config_.maximum_pool_size;
            if (new_max_size == old_max_size) {
                LOG_DEBUG("Maximum pool size unchanged: " + std::to_string(new_max_size));
                return;
            }

            config_.maximum_pool_size = new_max_size;

            if (new_max_size < old_max_size) {
                // 缩容：标记多余线程退出
                int current_thread_count = static_cast<int>(workers_.size());
                if (current_thread_count > new_max_size) {
                    threads_to_terminate_ = current_thread_count - new_max_size;
                    condition_.notify_all();
                }
                // 避免循环依赖，使用标准输出
                std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Reduced maximum pool size from " << old_max_size <<
                            " to " << new_max_size << std::endl;
            } else {
                // 避免循环依赖，使用标准输出
                std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Increased maximum pool size from " << old_max_size <<
                            " to " << new_max_size << std::endl;
            }
        }

        void ThreadPool::startMonitoring() {
            if (!config_.enable_monitoring) {
                return;
            }

            monitoring_enabled_ = true;
            monitoring_thread_ = std::thread([this]() {
                // 避免循环依赖，使用标准输出 - 这是导致测试卡住的关键点！
                std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Monitoring thread started" << std::endl;

                while (monitoring_enabled_.load()) {
                    try {
                        reportPoolStatus();

                        // 使用更短的睡眠间隔，以便更快响应停止信号
                        auto interval = std::chrono::milliseconds(config_.monitoring_interval_ms);
                        auto sleep_chunk = std::chrono::milliseconds(100); // 100ms chunks

                        while (interval > std::chrono::milliseconds(0) && monitoring_enabled_.load()) {
                            auto sleep_time = std::min(interval, sleep_chunk);
                            std::this_thread::sleep_for(sleep_time);
                            interval -= sleep_time;
                        }
                    } catch (const std::exception& e) {
                        // 避免循环依赖，使用标准错误输出
                        std::cerr << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Error in monitoring thread: " << e.what() << std::endl;
                        // 发生异常时也要检查是否应该退出
                        if (!monitoring_enabled_.load()) {
                            break;
                        }
                    }
                }

                // 避免循环依赖，使用标准输出
                std::cout << "[ThreadPool] [" << __FILE__ << ":" << __LINE__ << "] Monitoring thread stopped" << std::endl;
            });
        }

        void ThreadPool::stopMonitoring() {
            std::cout << "[ThreadPool] Stopping monitoring thread..." << std::endl;
            monitoring_enabled_ = false;

            if (monitoring_thread_.joinable()) {
                try {
                    std::cout << "[ThreadPool] Joining monitoring thread..." << std::endl;

                    // 给监控线程一些时间来响应停止信号
                    std::this_thread::sleep_for(std::chrono::milliseconds(200));

                    monitoring_thread_.join();
                    std::cout << "[ThreadPool] Monitoring thread joined successfully" << std::endl;
                } catch (const std::exception& e) {
                    std::cerr << "[ThreadPool] Error joining monitoring thread: " << e.what() << std::endl;
                    // 如果join失败，尝试detach
                    try {
                        monitoring_thread_.detach();
                        std::cerr << "[ThreadPool] Monitoring thread detached" << std::endl;
                    } catch (...) {
                        std::cerr << "[ThreadPool] Failed to detach monitoring thread" << std::endl;
                    }
                } catch (...) {
                    std::cerr << "[ThreadPool] Unknown error joining monitoring thread" << std::endl;
                    try {
                        monitoring_thread_.detach();
                        std::cerr << "[ThreadPool] Monitoring thread detached" << std::endl;
                    } catch (...) {
                        std::cerr << "[ThreadPool] Failed to detach monitoring thread" << std::endl;
                    }
                }
            } else {
                std::cout << "[ThreadPool] Monitoring thread was not joinable" << std::endl;
            }
        }

        void ThreadPool::reportPoolStatus() {
            std::lock_guard<std::mutex> lock(queue_mutex_);

            int active_threads = active_threads_.load();
            int total_threads = static_cast<int>(workers_.size());
            int queue_size = static_cast<int>(tasks_.size());

            // 只在队列有任务或有活跃线程时报告状态
            if (queue_size > 0 || active_threads > 0) {
                std::cout << "[ThreadPool] Status - Active: " << active_threads
                         << ", Total: " << total_threads
                         << ", Queue: " << queue_size << std::endl;
            }

            // 如果启用了线程转储
            if (config_.enable_thread_dump && queue_size > config_.queue_capacity * 0.8) {
                // 避免在关闭时使用日志系统，使用标准错误输出
                std::cerr << "[ThreadPool] Queue is " << ((queue_size * 100) / config_.queue_capacity)
                         << "% full, consider increasing pool size" << std::endl;
            }
        }
    }
}
